From 34b11086b69cf8293727bc0c10fca4c28bafdf71 Mon Sep 17 00:00:00 2001 From: "jrb44@swoop.cl.cam.ac.uk" Date: Tue, 22 Mar 2005 15:50:43 +0000 Subject: [PATCH] bitkeeper revision 1.1236.39.1 (42403ed3ayhqDaCmozMDqaFfcmjpzQ) Added asynchronous support to the blockstore. --- BitKeeper/etc/logging_ok | 1 + tools/blktap/blockstore.c | 531 ++++++++++++++++++++++++++++++++------ tools/blktap/blockstore.h | 4 +- 3 files changed, 456 insertions(+), 80 deletions(-) diff --git a/BitKeeper/etc/logging_ok b/BitKeeper/etc/logging_ok index fccd637b33..e872421227 100644 --- a/BitKeeper/etc/logging_ok +++ b/BitKeeper/etc/logging_ok @@ -34,6 +34,7 @@ iap10@pb007.cl.cam.ac.uk iap10@striker.cl.cam.ac.uk iap10@tetris.cl.cam.ac.uk jrb44@plym.cl.cam.ac.uk +jrb44@swoop.cl.cam.ac.uk jws22@gauntlet.cl.cam.ac.uk jws@cairnwell.research kaf24@camelot.eng.3leafnetworks.com diff --git a/tools/blktap/blockstore.c b/tools/blktap/blockstore.c index 079c45576b..dae1470f92 100644 --- a/tools/blktap/blockstore.c +++ b/tools/blktap/blockstore.c @@ -13,31 +13,73 @@ #include #include #include +#include #include "blockstore.h" #define BLOCKSTORE_REMOTE +//#define BSDEBUG -#ifdef BLOCKSTORE_REMOTE +/***************************************************************************** + * Debugging + */ +#ifdef BSDEBUG +void DB(char *format, ...) +{ + va_list args; + + va_start(args, format); + vfprintf(stderr, format, args); + va_end(args); +} +#else +#define DB(format, ...) (void)0 +#endif -//#define BSDEBUG +#ifdef BLOCKSTORE_REMOTE #include #include #include #include -#define ENTER_QUEUE_CR (void)0 -#define LEAVE_QUEUE_CR (void)0 +/***************************************************************************** + * * + *****************************************************************************/ + +/***************************************************************************** + * Network state * + *****************************************************************************/ +/* The individual disk servers we talks to. These will be referenced by + * an integer index into bsservers[]. + */ bsserver_t bsservers[MAX_SERVERS]; + +/* The cluster map. This is indexed by an integer cluster number. + */ bscluster_t bsclusters[MAX_CLUSTERS]; +/* Local socket. + */ struct sockaddr_in sin_local; int bssock = 0; +/***************************************************************************** + * Message queue management * + *****************************************************************************/ + +/* Protects the queue manipulation critcal regions. + */ +#define ENTER_QUEUE_CR (void)0 +#define LEAVE_QUEUE_CR (void)0 + +/* A message queue entry. We allocate one of these for every request we send. + * Asynchronous reply reception also used one of these. + */ typedef struct bsq_t_struct { struct bsq_t_struct *prev; struct bsq_t_struct *next; + int status; int server; int length; struct msghdr msghdr; @@ -46,8 +88,134 @@ typedef struct bsq_t_struct { void *block; } bsq_t; +#define BSQ_STATUS_MATCHED 1 + +#define ENTER_LUID_CR (void)0 +#define LEAVE_LUID_CR (void)0 + +static u64 luid_cnt = 0x1000ULL; +u64 new_luid(void) { + u64 luid; + ENTER_LUID_CR; + luid = luid_cnt++; + LEAVE_LUID_CR; + return luid; +} + +/* Queue of outstanding requests. + */ bsq_t *bs_head = NULL; bsq_t *bs_tail = NULL; +int bs_qlen = 0; + +/* + */ +void queuedebug(char *msg) { + bsq_t *q; + ENTER_QUEUE_CR; + fprintf(stderr, "Q: %s len=%u\n", msg, bs_qlen); + for (q = bs_head; q; q = q->next) { + fprintf(stderr, " luid=%016llx server=%u\n", + q->message.luid, q->server); + } + LEAVE_QUEUE_CR; +} + +int enqueue(bsq_t *qe) { + ENTER_QUEUE_CR; + qe->next = NULL; + qe->prev = bs_tail; + if (!bs_head) + bs_head = qe; + else + bs_tail->next = qe; + bs_tail = qe; + bs_qlen++; + LEAVE_QUEUE_CR; +#ifdef BSDEBUG + queuedebug("enqueue"); +#endif + return 0; +} + +int dequeue(bsq_t *qe) { + bsq_t *q; + ENTER_QUEUE_CR; + for (q = bs_head; q; q = q->next) { + if (q == qe) { + if (q->prev) + q->prev->next = q->next; + else + bs_head = q->next; + if (q->next) + q->next->prev = q->prev; + else + bs_tail = q->prev; + bs_qlen--; + goto found; + } + } + + LEAVE_QUEUE_CR; +#ifdef BSDEBUG + queuedebug("dequeue not found"); +#endif + return 0; + + found: + LEAVE_QUEUE_CR; +#ifdef BSDEBUG + queuedebug("dequeue not found"); +#endif + return 1; +} + +bsq_t *queuesearch(bsq_t *qe) { + bsq_t *q; + ENTER_QUEUE_CR; + for (q = bs_head; q; q = q->next) { + if ((qe->server == q->server) && + (qe->message.operation == q->message.operation) && + (qe->message.luid == q->message.luid)) { + + if ((q->message.operation == BSOP_READBLOCK) && + ((q->message.flags & BSOP_FLAG_ERROR) == 0)) { + q->block = qe->block; + qe->block = NULL; + } + q->length = qe->length; + q->message.flags = qe->message.flags; + q->message.id = qe->message.id; + q->status |= BSQ_STATUS_MATCHED; + + if (q->prev) + q->prev->next = q->next; + else + bs_head = q->next; + if (q->next) + q->next->prev = q->prev; + else + bs_tail = q->prev; + q->next = NULL; + q->prev = NULL; + bs_qlen--; + goto found; + } + } + + LEAVE_QUEUE_CR; +#ifdef BSDEBUG + queuedebug("queuesearch not found"); +#endif + return NULL; + + found: + LEAVE_QUEUE_CR; +#ifdef BSDEBUG + queuedebug("queuesearch found"); +#endif + return q; +} int send_message(bsq_t *qe) { int rc; @@ -71,16 +239,21 @@ int send_message(bsq_t *qe) { qe->iov[1].iov_len = BLOCK_SIZE; } - rc = sendmsg(bssock, &(qe->msghdr), 0); + qe->message.luid = new_luid(); + + qe->status = 0; + if (enqueue(qe) < 0) { + fprintf(stderr, "Error enqueuing request.\n"); + return -1; + } + + DB("send_message to %d luid=%016llx\n", qe->server, qe->message.luid); + rc = sendmsg(bssock, &(qe->msghdr), MSG_DONTWAIT); //rc = sendto(bssock, (void *)&(qe->message), qe->length, 0, // (struct sockaddr *)&(bsservers[qe->server].sin), // sizeof(struct sockaddr_in)); if (rc < 0) return rc; - - ENTER_QUEUE_CR; - - LEAVE_QUEUE_CR; return rc; } @@ -115,22 +288,148 @@ int recv_message(bsq_t *qe) { return rc; } +int get_server_number(struct sockaddr_in *sin) { + int i; + +#ifdef BSDEBUG2 + fprintf(stderr, + "get_server_number(%u.%u.%u.%u/%u)\n", + (unsigned int)sin->sin_addr.s_addr & 0xff, + ((unsigned int)sin->sin_addr.s_addr >> 8) & 0xff, + ((unsigned int)sin->sin_addr.s_addr >> 16) & 0xff, + ((unsigned int)sin->sin_addr.s_addr >> 24) & 0xff, + (unsigned int)sin->sin_port); +#endif + + for (i = 0; i < MAX_SERVERS; i++) { + if (bsservers[i].hostname) { +#ifdef BSDEBUG2 + fprintf(stderr, + "get_server_number check %u.%u.%u.%u/%u\n", + (unsigned int)bsservers[i].sin.sin_addr.s_addr&0xff, + ((unsigned int)bsservers[i].sin.sin_addr.s_addr >> 8)&0xff, + ((unsigned int)bsservers[i].sin.sin_addr.s_addr >> 16)&0xff, + ((unsigned int)bsservers[i].sin.sin_addr.s_addr >> 24)&0xff, + (unsigned int)bsservers[i].sin.sin_port); +#endif + if ((sin->sin_family == bsservers[i].sin.sin_family) && + (sin->sin_port == bsservers[i].sin.sin_port) && + (memcmp((void *)&(sin->sin_addr), + (void *)&(bsservers[i].sin.sin_addr), + sizeof(struct in_addr)) == 0)) { + return i; + } + } + } + + return -1; +} + +void *rx_buffer = NULL; +bsq_t rx_qe; +bsq_t *recv_any(void) { + struct sockaddr_in from; + int rc; + + DB("ENTER recv_any\n"); + + rx_qe.msghdr.msg_name = &from; + rx_qe.msghdr.msg_namelen = sizeof(struct sockaddr_in); + rx_qe.msghdr.msg_iov = rx_qe.iov; + if (!rx_buffer) { + rx_buffer = malloc(BLOCK_SIZE); + if (!rx_buffer) { + perror("recv_any malloc"); + return NULL; + } + } + rx_qe.block = rx_buffer; + rx_buffer = NULL; + rx_qe.msghdr.msg_iovlen = 2; + rx_qe.msghdr.msg_control = NULL; + rx_qe.msghdr.msg_controllen = 0; + rx_qe.msghdr.msg_flags = 0; + + rx_qe.iov[0].iov_base = (void *)&(rx_qe.message); + rx_qe.iov[0].iov_len = MSGBUFSIZE_ID; + rx_qe.iov[1].iov_base = rx_qe.block; + rx_qe.iov[1].iov_len = BLOCK_SIZE; + + rc = recvmsg(bssock, &(rx_qe.msghdr), 0); + if (rc < 0) { + perror("recv_any"); + return NULL; + } + rx_qe.length = rc; + rx_qe.server = get_server_number(&from); + + DB("recv_any from %d luid=%016llx len=%u\n", + rx_qe.server, rx_qe.message.luid, rx_qe.length); + + return &rx_qe; +} + +void recv_recycle_buffer(bsq_t *q) { + if (q->block) { + rx_buffer = q->block; + q->block = NULL; + } +} + +// cycle through reading any incoming, searching for a match in the +// queue, until we have all we need. +int wait_recv(bsq_t **reqs, int numreqs) { + bsq_t *q, *m; + unsigned int x, i; + + DB("ENTER wait_recv %u\n", numreqs); + + checkmatch: + x = 0xffffffff; + for (i = 0; i < numreqs; i++) { + x &= reqs[i]->status; + } + if ((x & BSQ_STATUS_MATCHED)) { + DB("LEAVE wait_recv\n"); + return numreqs; + } + + rxagain: + q = recv_any(); + if (!q) + return -1; + + m = queuesearch(q); + recv_recycle_buffer(q); + if (!m) { + fprintf(stderr, "Unmatched RX\n"); + goto rxagain; + } + + goto checkmatch; + +} + void *readblock_indiv(int server, u64 id) { void *block; bsq_t *qe; - int len; + int len, rc; qe = (bsq_t *)malloc(sizeof(bsq_t)); if (!qe) { perror("readblock qe malloc"); return NULL; } + qe->block = NULL; + + /* qe->block = malloc(BLOCK_SIZE); if (!qe->block) { perror("readblock qe malloc"); free((void *)qe); return NULL; } + */ qe->server = server; @@ -144,31 +443,40 @@ void *readblock_indiv(int server, u64 id) { goto err; } - len = recv_message(qe); + /*len = recv_message(qe); if (len < 0) { perror("readblock recv"); goto err; + }*/ + + rc = wait_recv(&qe, 1); + if (rc < 0) { + perror("readblock recv"); + goto err; } + if ((qe->message.flags & BSOP_FLAG_ERROR)) { fprintf(stderr, "readblock server error\n"); goto err; } - if (len < MSGBUFSIZE_BLOCK) { + if (qe->length < MSGBUFSIZE_BLOCK) { fprintf(stderr, "readblock recv short (%u)\n", len); goto err; } - if ((block = malloc(BLOCK_SIZE)) == NULL) { + /* if ((block = malloc(BLOCK_SIZE)) == NULL) { perror("readblock malloc"); goto err; } - //memcpy(block, qe->message.block, BLOCK_SIZE); + memcpy(block, qe->message.block, BLOCK_SIZE); + */ block = qe->block; free((void *)qe); return block; err: - free(qe->block); + if (qe->block) + free(qe->block); free((void *)qe); return NULL; } @@ -229,7 +537,8 @@ void *readblock(u64 id) { return block; } -int writeblock_indiv(int server, u64 id, void *block) { +bsq_t *writeblock_indiv(int server, u64 id, void *block) { + bsq_t *qe; int len; @@ -251,28 +560,14 @@ int writeblock_indiv(int server, u64 id, void *block) { perror("writeblock sendto"); goto err; } - - len = recv_message(qe); - if (len < 0) { - perror("writeblock recv"); - goto err; - } - if ((qe->message.flags & BSOP_FLAG_ERROR)) { - fprintf(stderr, "writeblock server error\n"); - goto err; - } - if (len < MSGBUFSIZE_ID) { - fprintf(stderr, "writeblock recv short (%u)\n", len); - goto err; - } - free((void *)qe); - return 0; + return qe; err: free((void *)qe); - return -1; + return NULL; } + /** * writeblock: write an existing block to disk @@ -282,11 +577,15 @@ int writeblock_indiv(int server, u64 id, void *block) { * @return: zero on success, -1 on failure */ int writeblock(u64 id, void *block) { - int map = (int)BSID_MAP(id); + int map = (int)BSID_MAP(id); int rep0 = bsclusters[map].servers[0]; int rep1 = bsclusters[map].servers[1]; int rep2 = bsclusters[map].servers[2]; + bsq_t *reqs[3]; + int rc; + + reqs[0] = reqs[1] = reqs[2] = NULL; #ifdef BSDEBUG fprintf(stderr, @@ -302,20 +601,65 @@ int writeblock(u64 id, void *block) { (unsigned int)((unsigned char *)block)[7]); #endif -/* special case for the "superblock" just use the first block on the + /* special case for the "superblock" just use the first block on the * first replica. (extend to blocks < 6 for vdi bug) */ if (id < 6) { - return writeblock_indiv(rep0, id, block); + reqs[0] = writeblock_indiv(rep0, id, block); + if (!reqs[0]) + return -1; + rc = wait_recv(reqs, 1); + return rc; } - if (writeblock_indiv(rep0, BSID_REPLICA0(id), block) < 0) - return -1; - if (writeblock_indiv(rep1, BSID_REPLICA1(id), block) < 0) - return -1; - if (writeblock_indiv(rep2, BSID_REPLICA2(id), block) < 0) - return -1; + reqs[0] = writeblock_indiv(rep0, BSID_REPLICA0(id), block); + if (!reqs[0]) + goto err; + reqs[1] = writeblock_indiv(rep1, BSID_REPLICA1(id), block); + if (!reqs[1]) + goto err; + reqs[2] = writeblock_indiv(rep2, BSID_REPLICA2(id), block); + if (!reqs[2]) + goto err; + + rc = wait_recv(reqs, 3); + if (rc < 0) { + perror("writeblock recv"); + goto err; + } + if ((reqs[0]->message.flags & BSOP_FLAG_ERROR)) { + fprintf(stderr, "writeblock server0 error\n"); + goto err; + } + if ((reqs[1]->message.flags & BSOP_FLAG_ERROR)) { + fprintf(stderr, "writeblock server1 error\n"); + goto err; + } + if ((reqs[2]->message.flags & BSOP_FLAG_ERROR)) { + fprintf(stderr, "writeblock server2 error\n"); + goto err; + } + + + free((void *)reqs[0]); + free((void *)reqs[1]); + free((void *)reqs[2]); return 0; + + err: + if (reqs[0]) { + dequeue(reqs[0]); + free((void *)reqs[0]); + } + if (reqs[1]) { + dequeue(reqs[1]); + free((void *)reqs[1]); + } + if (reqs[2]) { + dequeue(reqs[2]); + free((void *)reqs[2]); + } + return -1; } /** @@ -328,7 +672,7 @@ u64 allocblock(void *block) { return allocblock_hint(block, 0); } -u64 allocblock_hint_indiv(int server, void *block, u64 hint) { +bsq_t *allocblock_hint_indiv(int server, void *block, u64 hint) { bsq_t *qe; int len; @@ -351,26 +695,11 @@ u64 allocblock_hint_indiv(int server, void *block, u64 hint) { goto err; } - len = recv_message(qe); - if (len < 0) { - perror("allocblock_hint recv"); - goto err; - } - if ((qe->message.flags & BSOP_FLAG_ERROR)) { - fprintf(stderr, "allocblock_hint server error\n"); - goto err; - } - if (len < MSGBUFSIZE_ID) { - fprintf(stderr, "allocblock_hint recv short (%u)\n", len); - goto err; - } - - free((void *)qe); - return qe->message.id; + return qe; err: free((void *)qe); - return 0; + return NULL; } /** @@ -382,22 +711,48 @@ u64 allocblock_hint_indiv(int server, void *block, u64 hint) { */ u64 allocblock_hint(void *block, u64 hint) { int map = (int)hint; - int rep0 = bsclusters[map].servers[0]; int rep1 = bsclusters[map].servers[1]; int rep2 = bsclusters[map].servers[2]; - + bsq_t *reqs[3]; + int rc; u64 id0, id1, id2; - id0 = allocblock_hint_indiv(rep0, block, 0); - if (id0 == 0) - return 0; - id1 = allocblock_hint_indiv(rep1, block, 0); - if (id1 == 0) - return 0; - id2 = allocblock_hint_indiv(rep2, block, 0); - if (id2 == 0) - return 0; + reqs[0] = reqs[1] = reqs[2] = NULL; + + DB("ENTER allocblock\n"); + + reqs[0] = allocblock_hint_indiv(rep0, block, hint); + if (!reqs[0]) + goto err; + reqs[1] = allocblock_hint_indiv(rep1, block, hint); + if (!reqs[1]) + goto err; + reqs[2] = allocblock_hint_indiv(rep2, block, hint); + if (!reqs[2]) + goto err; + + rc = wait_recv(reqs, 3); + if (rc < 0) { + perror("allocblock recv"); + goto err; + } + if ((reqs[0]->message.flags & BSOP_FLAG_ERROR)) { + fprintf(stderr, "allocblock server0 error\n"); + goto err; + } + if ((reqs[1]->message.flags & BSOP_FLAG_ERROR)) { + fprintf(stderr, "allocblock server1 error\n"); + goto err; + } + if ((reqs[2]->message.flags & BSOP_FLAG_ERROR)) { + fprintf(stderr, "allocblock server2 error\n"); + goto err; + } + + id0 = reqs[0]->message.id; + id1 = reqs[1]->message.id; + id2 = reqs[2]->message.id; #ifdef BSDEBUG fprintf(stderr, "ALLOC: %016llx %02x%02x %02x%02x %02x%02x %02x%02x\n", @@ -411,8 +766,26 @@ u64 allocblock_hint(void *block, u64 hint) { (unsigned int)((unsigned char *)block)[6], (unsigned int)((unsigned char *)block)[7]); #endif - + + free((void *)reqs[0]); + free((void *)reqs[1]); + free((void *)reqs[2]); return BSID(map, id0, id1, id2); + + err: + if (reqs[0]) { + dequeue(reqs[0]); + free((void *)reqs[0]); + } + if (reqs[1]) { + dequeue(reqs[1]); + free((void *)reqs[1]); + } + if (reqs[2]) { + dequeue(reqs[2]); + free((void *)reqs[2]); + } + return 0; } #else /* /BLOCKSTORE_REMOTE */ @@ -543,13 +916,13 @@ int __init_blockstore(void) int i; bsservers[0].hostname = "firebug.cl.cam.ac.uk"; - bsservers[1].hostname = "tetris.cl.cam.ac.uk"; - bsservers[2].hostname = "donkeykong.cl.cam.ac.uk"; - bsservers[3].hostname = "gunfighter.cl.cam.ac.uk"; - bsservers[4].hostname = "galaxian.cl.cam.ac.uk"; - bsservers[5].hostname = "firetrack.cl.cam.ac.uk"; - bsservers[6].hostname = "funfair.cl.cam.ac.uk"; - bsservers[7].hostname = "felix.cl.cam.ac.uk"; + bsservers[1].hostname = "planb.cl.cam.ac.uk"; + bsservers[2].hostname = "simcity.cl.cam.ac.uk"; + bsservers[3].hostname = NULL/*"gunfighter.cl.cam.ac.uk"*/; + bsservers[4].hostname = NULL/*"galaxian.cl.cam.ac.uk"*/; + bsservers[5].hostname = NULL/*"firetrack.cl.cam.ac.uk"*/; + bsservers[6].hostname = NULL/*"funfair.cl.cam.ac.uk"*/; + bsservers[7].hostname = NULL/*"felix.cl.cam.ac.uk"*/; bsservers[8].hostname = NULL; bsservers[9].hostname = NULL; bsservers[10].hostname = NULL; diff --git a/tools/blktap/blockstore.h b/tools/blktap/blockstore.h index 1f805daafe..45082d0d01 100644 --- a/tools/blktap/blockstore.h +++ b/tools/blktap/blockstore.h @@ -40,6 +40,7 @@ struct bshdr_t_struct { u32 operation; u32 flags; u64 id; + u64 luid; } __attribute__ ((packed)); typedef struct bshdr_t_struct bshdr_t; @@ -52,12 +53,13 @@ typedef struct bsmsg_t_struct bsmsg_t; #define MSGBUFSIZE_OP sizeof(u32) #define MSGBUFSIZE_FLAGS (sizeof(u32) + sizeof(u32)) -#define MSGBUFSIZE_ID (sizeof(u32) + sizeof(u32) + sizeof(u64)) +#define MSGBUFSIZE_ID (sizeof(u32) + sizeof(u32) + sizeof(u64) + sizeof(u64)) #define MSGBUFSIZE_BLOCK sizeof(bsmsg_t) #define BSOP_READBLOCK 0x01 #define BSOP_WRITEBLOCK 0x02 #define BSOP_ALLOCBLOCK 0x03 +#define BSOP_FREEBLOCK 0x04 #define BSOP_FLAG_ERROR 0x01 -- 2.30.2